SAMZA-2561: Add job features to MetricsHeader#1402
SAMZA-2561: Add job features to MetricsHeader#1402PawasChhokra wants to merge 1 commit intoapache:masterfrom
Conversation
Sanil15
left a comment
There was a problem hiding this comment.
Thanks for the changes, Minor comments
|
|
||
| package org.apache.samza.metrics; | ||
|
|
||
| public enum ApiType { |
There was a problem hiding this comment.
nit: SamzaAPI not a better name?
| public static String getApiType(Config config) { | ||
| ApplicationConfig appConfig = new ApplicationConfig(config); | ||
| String appClass = appConfig.getAppClass(); | ||
| if (appClass == null || appClass.isEmpty()) { |
There was a problem hiding this comment.
Is this ever true? for legacy task class applications?
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| public class Util { |
There was a problem hiding this comment.
Not related to your change but JobInfoUtil is a better name for this Util class rather than just Util
| private int numPersistentStores = 2; | ||
| private int containerNumCores = 2; | ||
| private boolean autosizingEnabled = false; | ||
| private String deploymentType = "test deployment type"; |
There was a problem hiding this comment.
consider adding valid values for deployType and API type, to unit test the util functions you just added
| Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion); | ||
| Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname); | ||
| Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName()); | ||
|
|
There was a problem hiding this comment.
nit: revert whitespace change
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.samza.metrics; | ||
|
|
||
| public enum DeploymentType { | ||
| YARN, STANDALONE | ||
| } |
There was a problem hiding this comment.
Single line enums like these can likely be added to the metrics-header class, since that seems like the only place theyre used no?
Same for API type above.
| public static int getContainerCount(Config config) { | ||
| JobConfig jobConfig = new JobConfig(config); | ||
| return jobConfig.getContainerCount(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Why even have it as a util?
Could the caller not cast the "config" object into JobConfig, TaskConfig and ClusterManagerConfig once and invoke the respective function, rather than do a cast in each separate util.
Same for getContainerMemoryMb, getNumCores, getThreadPoolSize, getSspGrouperFactory, getHostAffinityEnabled, getContainerRetryCount, getContainerRetryWindowMs, getMaxConcurrency.
Then none of these "util" methods need to be there.
| public static int getMaxJvmHeapMb() { | ||
| Long maxJvmHeapMb = Runtime.getRuntime().maxMemory() / (1024 * 1024); | ||
| return maxJvmHeapMb.intValue(); | ||
| } |
There was a problem hiding this comment.
Can DiagnosticsUtil.buildDiagnosticsManager also invoke this util method, instead of calling Runtime.getRuntime().maxMemory directly ?
| Duration terminationDuration, boolean autosizingEnabled) { | ||
| Duration terminationDuration, | ||
| boolean autosizingEnabled, | ||
| String deploymentType, |
There was a problem hiding this comment.
Should deploymentType and api type be enums, since we defined an enum for them above?
| metricsSnapshot.getHeader().getContainerThreadPoolSize(), metricsSnapshot.getHeader().getHostAffinity(), | ||
| metricsSnapshot.getHeader().getSspGrouper(), metricsSnapshot.getHeader().getMaxContainerRetryCount(), | ||
| metricsSnapshot.getHeader().getContainerRetryWindowMs(), metricsSnapshot.getHeader().getTaskMaxConcurrency(), | ||
| metricsSnapshot.getHeader().getMaxJvmHeapMb()); |
There was a problem hiding this comment.
Can we add a new separate constructor for DiagnosticsStreamMessage that takes a metricsHeader as input?
| serde, | ||
| blacklist, | ||
| Util.getDeploymentType(config), | ||
| Util.getApiType(config), | ||
| Util.getContainerCount(config), | ||
| Util.getContainerMemoryMb(config), | ||
| Util.getNumCores(config), | ||
| Util.getThreadPoolSize(config), | ||
| Util.getHostAffinityEnabled(config), | ||
| Util.getSspGrouperFactory(config), | ||
| Util.getContainerRetryCount(config), | ||
| Util.getContainerRetryWindowMs(config), | ||
| Util.getMaxConcurrency(config), | ||
| Util.getMaxJvmHeapMb) |
There was a problem hiding this comment.
Please maintain the existing ordering of passing in the parameters, then the serde and then the blacklist.
Similar for MetricsHeader above, where time fields are passed in last.
| private int containerNumCores = 2; | ||
| private boolean autosizingEnabled = false; | ||
| private String deploymentType = "test deployment type"; | ||
| private String apiType = "test api type"; |
rmatharu-zz
left a comment
There was a problem hiding this comment.
Took an initial pass, requires some simplification and cleanup.
|
Since a majority of the parameters are derived from config, in the interest of easy future extensibility, it'd be better to simply emit the entire config object once, from DiagnosticsManager at container-startup. |
| public class Util { | ||
| private static final Logger LOG = LoggerFactory.getLogger(Util.class); | ||
| private static final String YARN_JOB_FACTORY_CLASS = "org.apache.samza.job.yarn.YarnJobFactory"; | ||
| private static final String BEAM_RUNNER_CLASS = "org.apache.beam.runners.samza.SamzaRunner"; |
Issues: Currently, the MetricsHeader object emitted by the SamzaContainer does not exclude basic job level information.
Changes: Added a few features of the job to be emitted by the MetricsHeader
API Changes: With this change, the MetricsHeader class will emit other properties of the job like number of containers used, number of cores used, etc.
Upgrade instructions: None
Usage instructions: None
Tests: Modified existing tests to work with this change.